<!DOCTYPE html>
<html lang="en">

<head>
  <meta charset="utf-8">
  <meta http-equiv="X-UA-Compatible" content="IE=edge">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  
  <title>DtCraft - High-performance Cluster Computing Engine</title>
  <base href="../">

  <link rel="icon" href="images/favicon.ico">

  <!-- Bootstrap core CSS -->
  <link rel="stylesheet" href="css/bootstrap.min.css">
  <link rel="stylesheet" href="css/dtcraft.css">
  <script type="text/javascript" src="js/jquery-3.2.1.min.js"></script>
  <script type="text/javascript" src="js/bootstrap.min.js"></script>
  <script type="text/javascript" src="js/run_prettify.js"></script>

</head>


<body>

<div class="container">

  <script src="js/dtcraft-navbar.js?random=<?php echo uniqid(); ?>"></script>

  <div class="content">
  
  <h1>Stream Graph</h1>
  <p>In this chapter, we will start to learn DtCraft's <em>stream graph</em> programming model
  and deploy applications on a cluster.
  The idea of streaming is analogous to an assembly pipeline, where one end produces data
  and the other end takes action on the data.
  In real applications, data streams come continuously and indefinitely,
  and we need to compute the data soon after they arrive.
  Our stream graph programming model is developed based on this behavior.
  </p>

  <p> A stream graph consists of three key components, 
  <em>vertex</em>, <em>stream</em>, and <em>containe</em>r.
  Vertex is used to synchronize all adjacent streams, and to store application-specific data at runtime.
  Stream connects vertices and invokes computation callback when data arrive.
  Container is used to request resources (cpu, memory) for your program.
  The class <code>Graph</code> is the main entry to create these components.
  </p>

<pre><code class="prettyprint lang-cpp">class Graph {
  VertexBuilder vertex();
  StreamBuilder stream(key_type, key_type);
  ContainerBuilder container(); 
};
</code></pre>

  <p>Each graph component is associated with a <em>unique integer key</em> 
  which is <em>deterministically</em> consistent throughout
  an application's lifetime.
  Since a graph may be partitioned into different portions running on different machines, 
  we use these keys to enable <em>partial</em> execution and decide which component goes to which partition.
  </p>

  <h3>Vertex</h3>

  <p>Vertex represents a persistent storage to interact with incoming and outgoing streams.
  It has a constructor-like callback to synchronize adjacent streams,
  and a public <code>std::any</code> object to store application-specific data.
  This is similar to C-style <code>void*</code> but safer in type conversion.
  You can access a particular input stream or output stream at a vertex given a stream key.
  In most situations, we only obtain the output stream from a vertex to write data.
  The source code of vertex can be found in <code>include/dtc/kernel/vertex.hpp</code>
  and <code>src/kernel/vertex.cpp</code>.
  The code below shows a list of common methods to call from a vertex.</code></p>

<pre><code class="prettyprint lang-cpp">class Vertex {
  const key_type key;  // key of this vertex
  std::any any;        // store your data of any types

  std::shared_ptr&lt;InputStream&gt; istream(key_type key) const;
  std::shared_ptr&lt;OutputStream&gt; ostream(key_type key) const;

  void remove_ostream(key_type key) const;
  void remove_istream(key_type key) const;
  
  template &lt;typename... T&gt;
  void broadcast(T&&...) const;
  
  template &lt;typename K, typename... T&gt;
  void broadcast_to(K&& keys, T&&...) const;
};
</code></pre>
  
  <p>
  Most methods are easily understandable by their names.
  <ul>
  <li><code>istream</code> returns a shared pointer to the input stream event at a given key,
  or <code>nullptr</code> if not accessible from this vertex.</li>
  <li><code>ostream</code> returns a shared pointer to the output stream event at a given key,
  or <code>nullptr</code> if not accessible from this vertex.</li>
  <li><code>remove_ostream</code> removes a output stream event from this vertex at a given key.</li>
  <li><code>remove_istream</code> removes an input  stream event from this vertex at a given key.</li>
  <li><code>broadcast</code> writes data to all output streams.</li>
  <li><code>broadcast_to</code> writes data to those output streams specified in a vector of keys.</li>
  </ul>
  </p>
	
	<p>Performing stream operations through a vertex only makes sense for adjacent streams.
  We skip those operations not belonging to the vertex.
  Closing one end of a stream will automatically force the other end to close.
  When a stream is closed, accessing it will return <code>nullptr</code>.</p>


  <h5>Example: Access a Output Stream at a Vertex</h5>
  <p>The example below writes an integer <code>123456</code> through the output stream <code>key</code> 
  at the vertex <code>v</code>, and then closes the stream.</p>
<pre><code class="prettyprint lang-cpp">auto callback = [key] (dtc::Vertex& v) {
  if(auto os = v.ostream(key); os) {
    (*os)(123456);
    v.close_ostream(key);
  }
  else {
    std::cout &lt;&lt; "stream " &lt;&lt; key &lt;&lt; " doesn't exist or is closed\n";
  }
};
</code></pre>
  
  <h5>Example: Broadcast at a Vertex</h5>
  <p>The example below writes an integer <code>123456</code> through all output streams 
  at the vertex <code>v</code>.</p>
<pre><code class="prettyprint lang-cpp">auto callback = [key] (dtc::Vertex& v) {
  v.broadcast(123456);
};
</code></pre>
  
  <div class="info notes">
  <p><b>Note:</b> Removing an input or a output stream is an irreversible operation. There is no chance
  to reopen a closed stream.</p>
  </div>
  
  <h3>Stream</h3>

  <p>Stream is the heart of a DtCraft application. Each stream is associated with a pair of vertices
  and computation callbacks at both ends.
  The vertex on the output side of a stream is referred to as <em>tail</em>, 
  and <em>head</em> on the input side.
  We do not allow direct access to a stream object but decompose it to a vertex and an I/O stream event.
  A stream can be either an <em>inter-stream</em> or an <em>intra-stream</em>, depending on the partition
  layout of the scheduler.
  We have unified the callback for both cases.
  The callback of a stream has the following two forms: </p>

<pre><code class="prettyprint lang-cpp">std::function&lt;Event::Signal(Vertex&, OutputStream&)&gt;;  // ostream callback
std::function&lt;Event::Signal(Vertex&, InputStream&)&gt;;   // istream callback
</code></pre>

  <p>The input stream callback is invoked when data arrive and the output stream callback is invoked 
  when data are sent. The kernel autonomously polls to synchronize the stream buffer with the associated
  device before delegating control over the stream callback. 
  You can safely assume in the context of the callback
  the stream has been properly synchronized.
  The return of the callback is an event signal which can be either <code>dtc::Event::DEFAULT</code> 
  or <code>dtc::Event::REMOVE</code>.
  Returning <code>dtc::Event::REMOVE</code> from a callback will force the kernel 
  to remove the stream event.
  </p>

  <h5>Example: Extract Data From an Input Stream</h5>
  <p>The example below defines an input stream callback that extracts 
  a vector of integers from an input stream event.</p>

<pre><code class="prettyprint lang-cpp">auto callback = [] (dtc::Vertex& head, dtc::InputStream& is) {
  if(auto std::vector<int> data; is(data) != -1) {
    std::cout &lt;&lt; "Received a vector of integers at vertex " &lt;&lt; head.key &lt;&lt; '\n';
    return dtc::Event::REMOVE; // remove this stream event
  }
  return dtc::Event::DEFAULT;  // incomplete data, try again later
}
</code></pre>
  
  <div class="info notes">
  <p><b>Note:</b> Multiple stream events can happen at the same time. It is users' responsibility to
  avoid data race when computation needs to synchronize at a vertex storage, for example, a parallel
  reduction.</p>
  </div>


  <h3>Container</h3>
	<p>DtCraft implements a lightweight Linux container to isolate resources of a graph.
  Unlike existing frameworks, our container interface allows you to request different resources for
  different portions of a graph. Currently we support cpu and memory resources.
  Resources must be requested on a vertex basis. The kernel scheduler partitions the graph
  based on these requests into <em>topologies</em> and deploys them on remote machines.
  By default, DtCraft creates one container to include unassigned vertices.
  </p>  


  <h1>Graph Builder</h1>

  <p>Every time you create a graph component,
  it will not be instantiated until runtime.
  Instead, you get a <em>builder</em> object on behalf of a particular graph component.
  Each builder object provides a rich set of member functions to initialize the associated attributes.
  The source of graph builder can be found in <code>src/kernel/graph.cpp</code> and 
  <code>include/dtc/kernel/graph.hpp</code>.
  </p>

  <h3>Vertex Builder</h3>
  <p>Vertex builder <code>VertexBuilder</code> lets you initialize the attributes of a vertex.</p>

<pre><code class="prettyprint lang-cpp">class VertexBuilder {
  const key_type key;  // key of the associated vertex
  
  operator key_type() const; 
  
  template &lt;typename C&gt;
  VertexBuilder& on(C&& c);

  VertexBuilder& tag(std::string str);
  VertexBuilder& program(std::string command);
};
</code></pre>
  
  <p>
  Each builder method returns a reference to itself similar to the JavaScript builder pattern.
  <ul>
  <li><code>()</code> implicitly converts the builder to the vertex key.</li>
  <li><code>on</code> sets the associated callback to <code>c</code>, convertible to
  <code>std::function&lt;void(dtc::Vertex&)&gt;</code>.</li>
  <li><code>tag</code> sets the associated tag to <code>str</code>.</li>
  <li><code>program</code> sets the vertex as an external program supplied by <code>command</code>.</li>
  </ul>
  </p>

  <p>Multiple calls to the same method will cause previous values to be replaced by the newest one.</p>


  <h5>Example: Set a Vertex Callback</h5>
  <p>The example below creates a vertex and assigns a callback to
  initialize the any object to a vector of integers.</p>

<pre><code class="prettyprint lang-cpp">auto vertex = G.vertex().on(
  [] (dtc::Vertex& v) {
    v.any = std::vector&lt;int&gt;();
  }
);
</code></pre>
  
  <h5>Example: Set a Vertex Program</h5>
  <p>The example below creates a vertex program that executes the command <code>/bin/ls -al</code>.</p>

<pre><code class="prettyprint lang-cpp">auto vertex = G.vertex().on(
  [] (dtc::Vertex& v) {
    std::cout &lt;&lt; "Ready to execute '/bin/ls -al'\n";
  }
).program("/bin/ls -al");
</code></pre>
  
  
	
  
  <h3>Stream Builder</h3>
  <p>Stream builder <code>StreamBuilder</code> lets you initialize the attributes of a stream.</p>

<pre><code class="prettyprint lang-cpp">class StreamBuilder {
  const key_type key;  // key of the associated stream
  
  operator key_type() const; 
  
  template &lt;typename C&gt;
  StreamBuilder& on(C&& c);

  StreamBuilder& critical(bool flag);
  StreamBuilder& tag(std::string str);
};
</code></pre>
  
  <p>
  Each builder method returns a reference to itself similar to the JavaScript builder pattern.
  <ul>
  <li><code>()</code> implicitly converts the builder to the stream key.</li>
  <li><code>on</code> sets the associated callback to <code>c</code>, convertible to
  either <code>std::function&lt;dtc::Event::Signal(dtc::Vertex&, dtc::InputStream&)&gt;</code> 
  or <code>std::function&lt;dtc::Event::Signal(dtc::Vertex&, dtc::OutputStream&)&gt;</code> 
  at compile time.</li>
  <li><code>critical</code> sets the stream to be critical. When a critical stream is closed,
  the entire application will terminate.</li>
  <li><code>tag</code> sets the associated tag to <code>str</code>. This is useful when the stream
  is connected to a vertex program.</li>
  </ul>
  </p>

  <p>Multiple calls to the same method will cause previous values replaced by the newest one.
  In most cases, we only set the input stream callback and ignore the output stream callback.</p>

  <h5>Example: Set a Stream Callback</h5>
  <p>The example below creates a stream and assigns an input stream callback. The callback prints
  the number of bytes that are immediately available to read and then closes the stream.</p>

<pre><code class="prettyprint lang-cpp">auto tail = G.vertex();
auto head = G.vertex();
auto edge = G.stream(tail, head).on(
  [] (dtc::Vertex& v, dtc::InputStream& is) {
    std::cout &lt;&lt; "stream has " &lt;&lt; is.isbuf.in_avail() &lt;&lt; " bytes available to read\n";
    return dtc::Event::REMOVE;
  }
);
</code></pre>
  
  <h3>Container Builder</h3>
  <p>Container builder <code>ContainerBuilder</code> lets you initialize the attributes of a container,
  in terms of system resources such as cpu count and memory limit. 
  The DtCraft kernel creates a lightweight Linux container to isolate your application.
  Container only works for <em>distributed</em> mode, where a graph is partitioned to topologies based
  on a user-specified container layout.
  Local mode simply runs the program under the default resource limit on the user.</p>

<pre><code class="prettyprint lang-cpp">class ContainerBuilder {
  const key_type key;  // key of the associated container
  
  operator key_type() const; 
 
  ContainerBuilder& add(key_type key);
  ContainerBuilder& cpu(uintmax_t num);
  ContainerBuilder& memory(uintmax_t bytes);
  ContainerBuilder& space(uintmax_t bytes);
  ContainerBuilder& host(std::string hostname);
};
</code></pre>
  
  <p>
  Each builder method returns a reference to itself similar to the JavaScript builder pattern.
  <ul>
  <li><code>()</code> implicitly converts the builder to the container key.</li>
  <li><code>add</code> adds the vertex <code>key</code> to this container.</li>
  <li><code>cpu</code> assigns <code>num</code> cpus to this container.</li>
  <li><code>memory</code> assigns <code>bytes</code> memory to this container.</li>
  <li><code>space</code> assigns <code>bytes</code> disk space to this container.</li>
  <li><code>host</code> forces this container to run on the machine <code>hostname</code>.</li>
  </ul>
  </p>

  <p>Container only adds vertex components as streams connections can be determined based on
  the vertex layout.
  The stream crossing two containers is called <em>inter-stream</em> and often comes with higher latency
  than that inside a container, called <em>intra-stream</em>.
  The DtCraft kernel deploys a graph based on the container layout.</p>
  
  <h5>Example: Containerize Vertices in a Graph</h5>
  <p>The example below creates two containers for two vertices, respectively, in a graph,
  and requests 1 cpu and 1 GB memory for each.</p>

<pre><code class="prettyprint lang-cpp">using namespace dtc::literals;
auto A = G.vertex();
auto B = G.vertex();
auto C1 = G.container().add(A).cpu(1).memory(1_GB);
auto C2 = G.container().add(B).cpu(1).memory(1_GB);
</code></pre>
  
	

  <h1>Vertex Program</h1>

  <p>DtCraft supports the execution of external programs in a graph.
  This is particularly useful when you want to create a new stream application on top of
  existing or 3rd-party programs.
  When a vertex is specified as an external program, the kernel spawns a new program supplied by
  the given command. 
  Connections will be passed to the program through the environment variable <code>DTC_BRIDGES</code>,
  where key is the stream tag and value is the associated file descriptor.
  You can retrieve these handles and establish stream channels using your own libraries or our I/O stream API.
  The file descriptor passed to an external program is restored to <em>blocking</em>
  and <em>open-on-exec</em>.</p>
  
  <h5>Example: Execute an External Program in a Vertex</h5>
  <p>The example below demonstrates how to execute an external program and get the stream handle
  from its environment variable.
  When the external program finishes, it closes all adjacent streams which in turn forces all other ends
  to close.</p>

<pre><code class="prettyprint lang-cpp">// Graph program.
auto v = G.vertex().program("demo")
auto u = G.vertex();
auto s = G.stream(u, v).tag("some_id_here");
</code></pre>

<pre><code class="prettyprint lang-cpp">// Demo program
std::cout &lt;&lt; ::getenv("DTC_BRIDGES") &lt;&lt; '\n';
</code></pre>
  
  <h1>Execute a Stream Graph</h1>

  <p>After you created a stream graph, you can run it under either <em>local</em> mode 
  or <em>distributed</em> mode.
  Local mode runs as a normal process on your operating system and distributed mode
  deploys your graph to remote machines.
  To execute a stream graph,
  you must create an <code>executor</code> object from your graph and call the method <code>run</code>.
  The executor will take care of runtime controls and interact with the kernel to deploy your graph.</p>

  
  <h5>Example: Create an Executor to Run a Stream Graph</h5>
  <p>The example below demonstrates how to create an executor to execute a stream graph.</p>

<pre><code class="prettyprint lang-cpp">dtc::Executor(G).run();
</code></pre>
  
  <div class="info notes">
  <p><b>Note:</b> Currently, DtCraft supports only a single graph in one application. 
  Defining more than one stream graph in your program can result in undefined behavior.</p>
  </div>

  <h3>Run a Stream Graph Locally</h3>

  <p>Running a stream graph under local mode is the same as running a binary from your local console.
  After the program is successfully compiled, simply type in <code>./program_name</code> and pass the 
  argument list you defined. Local mode does not run any master and agents.
  It is always a good idea to ensure your graph run correctly on local mode
  before submitting it to a cluster.</p>

	<h3>Submit a Stream Graph to a Cluster</h3>
  <p>The <code>submit</code> script in DtCraft's <code>sbin</code> directory is used to help
  submit an application to a cluster.
  Please refer to <a href="quickstart.html">QuickStart</a> for how to run a stream graph on 
  distributed mode.</p>
	
  <h1>Where to Go from Here?</h1>
  
  <p>Congratulations! You have just learnt to create a DtCraft application using our stream graph
  programming model. Please refer to <code>examples/</code> to know more stream graph examples in practice.
   </p>

  </div>  <!-- end of content -->
  
  <script src="js/dtcraft-footer.js"></script>

</div>

</body>


</html>


